package org.voovan.korla.socket;

import org.voovan.korla.handler.ProviderHandler;
import org.voovan.korla.message.Callback;
import org.voovan.korla.message.Msg;
import org.voovan.network.IoSession;
import org.voovan.network.exception.SendMessageException;
import org.voovan.network.filter.ByteFilter;
import org.voovan.network.messagesplitter.ByteMessageSplitter;
import org.voovan.network.tcp.TcpServerSocket;
import org.voovan.tools.TObject;
import org.voovan.tools.log.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

import static org.voovan.korla.KorlaStatic.DEFAULT_CHANNEL;
import static org.voovan.tools.TObject.nullDefault;

/**
 * Korla 服务类
 *
 * @author: helyho
 * korla Framework.
 * WebSite: https://github.com/helyho/korla
 * Licence: Apache v2 License
 */
public class KorlaProvider {
    public final ConcurrentHashMap<String, SessionCache> sessionCaches = new ConcurrentHashMap<String, SessionCache>();

    private String host;
    private int port;
    private int readTimeout;
    private int sendTimeout;
    private Function<Object, Integer> sliceAlgorithm; //分片算法

    private transient TcpServerSocket tcpServerSocket;
    private List<String> allowAddress;
    private transient SocketHandler socketHandler = new SocketHandler();
    private transient ProviderHandler providerHandler;

    /**
     *
     * @param host      监听地址
     * @param port		监听端口
     * @param readTimeout   超时时间, 单位: 毫秒
     * @param sendTimeout 发超时时间, 单位: 毫秒
     * @throws IOException	IO异常
     */
    public KorlaProvider(String host, int port, int readTimeout, int sendTimeout) throws IOException {
        this.host = host;
        this.port = port;
        this.readTimeout = readTimeout;
        this.sendTimeout = sendTimeout;

        allowAddress = new ArrayList<String>();
        allowAddress.add(host.trim());
        tcpServerSocket = new TcpServerSocket(host, port, readTimeout, sendTimeout, 1);
        tcpServerSocket.messageSplitter(new ByteMessageSplitter());

        tcpServerSocket.filterChain().add(new ByteFilter());
        tcpServerSocket.filterChain().add(new KorlaFilter());
        tcpServerSocket.handler(socketHandler);
        providerHandler = new ProviderHandler(this);
        socketHandler.setHandler(providerHandler);
    }

    public TcpServerSocket getTcpServerSocket() {
        return tcpServerSocket;
    }

    public String getHost() {
        return host;
    }

    public int getPort() {
        return port;
    }

    public int getReadTimeout() {
        return readTimeout;
    }

    public int getSendTimeout() {
        return sendTimeout;
    }

    /**
     * 获取允许连接的 ip 地址集合
     * @return ip 地址结合
     */
    public List<String> getAllowAddress() {
        return allowAddress;
    }

    /**
     * 设置允许连接的 ip 地址集合
     * @param allowAddress ip 地址集合
     */
    public void setAllowAddress(List<String> allowAddress) {
        this.allowAddress = allowAddress;
    }

    public Function<Object, Integer> getSliceAlgorithm() {
        return sliceAlgorithm;
    }

    public void setSliceAlgorithm(Function<Object, Integer> sliceAlgorithm) {
        this.sliceAlgorithm = sliceAlgorithm;
    }

    /**
     * 启动监听
     * @throws IOException IO异常
     */
    public void listen() throws IOException {
        tcpServerSocket.syncStart();
        Logger.info("KorlaProvider listen: " + host + ":" + port);
    }

    /**
     * 是否处于监听状态
     * @return true: 监听中, false: 未监听
     */
    public boolean isListening(){
        return tcpServerSocket.isConnected();
    }

    /**
     * 向默认通道增加一个回调
     * @param callback 回调对象
     */
    public void addCallback(Callback callback) {
       addCallback(null, callback);
    }

    /**
     * 向指定通道增加一个回调
     * @param channel 通道名称
     * @param callback 回调对象
     */
    public void addCallback(String channel, Callback callback) {
        channel = TObject.nullDefault(channel, DEFAULT_CHANNEL);
        providerHandler.addCallback(channel, callback);
    }

    /**
     * 获取指定通道的回调
     * @param channel 通道名
     * @return 回调对象
     */
    public Callback getCallback(String channel) {
        return providerHandler.getCallback(channel);
    }

    /**
     * 消息重试
     * @param channel 通道名称
     * @param msg 被重试的消息
     * @return 相应消息
     */
    public void retry(String channel, Msg msg){
        channel = TObject.nullDefault(channel, DEFAULT_CHANNEL);
        Callback callback = providerHandler.getCallback(channel);
        Msg result = callback.execute(msg);
        randomSend(channel, result);
    }

    /**
     * 消息重试
     * @param msg 被重试的消息
     * @return 相应消息
     */
    public void retry(Msg msg){
        retry(null, msg);
    }

    public void bindSession(String channel, IoSession session) {
        getOrCreateSessionCache(channel).put(session);
    }

    /**
     * 获取或者创建命名通道的 Session 集合
     * @param channel 通道名
     * @return SessionCache 对象
     */
    private SessionCache getOrCreateSessionCache(String channel){
        SessionCache sessionCache = sessionCaches.get(channel);
        if(sessionCache==null) {
            sessionCache = new SessionCache(channel);
            sessionCaches.put(channel, sessionCache);
        }

        return sessionCache;
    }

    /**
     * 对指定通道广播消息
     * @param channel 通道名称
     * @param msg 消息
     */
    public void broadcast(String channel, Msg msg) {
        channel = nullDefault(channel, DEFAULT_CHANNEL);
        for(IoSession ioSession : sessionCaches.get(channel).getSessions()) {
            if(ioSession.isConnected()) {
                try {
                    ioSession.syncSend(msg);
                } catch (SendMessageException e) {
                    Logger.error("[Korla] KorlaProvid.broadcast has error", e);
                }
            }
        }
    }

    /**
     * 对默认通道广播消息
     * @param msg 消息
     */
    public void broadcast(Msg msg) {
        broadcast(null, msg);
    }

    /**
     * 从指定通道随机选择连接发送消息
     * @param channel 通道名称
     * @param msg 消息
     * @return true: 发送成功, false: 发送失败
     */
    public boolean randomSend(String channel, Msg msg) {
        channel = nullDefault(channel, DEFAULT_CHANNEL);
        IoSession ioSession = sessionCaches.get(channel).getRandom();

        return send(ioSession, msg);
    }


    /**
     * 从指定的Sesson, 发送消息
     * @param session IoSession 会像
     * @param msg 消息
     * @return true: 发送成功, false: 发送失败
     */
    public boolean send(IoSession session, Msg msg) {
        if(session.isConnected()) {
            try {
                session.syncSend(msg);
                return true;
            } catch (SendMessageException e) {
                return false;
            }
        }

        return false;
    }

    /**
     * 从默认通道随机选择连接发送消息
     * @param msg 消息
     * @return true: 发送成功, false: 发送失败
     */
    public boolean randomSend(Msg msg) {
        return randomSend(null, msg);
    }

    /**
     * 从指定通道轮训选择连接发送消息
     * @param channel 通道名称
     * @param msg 消息
     * @return true: 发送成功, false: 发送失败
     */
    public boolean loopSend(String channel, Msg msg) {
        channel = nullDefault(channel, DEFAULT_CHANNEL);
        IoSession ioSession = sessionCaches.get(channel).getLoopNext();

        return send(ioSession, msg);
    }

    /**
     * 从默认通道轮训选择连接发送消息
     * @param msg 消息
     * @return true: 发送成功, false: 发送失败
     */
    public boolean loopSend(Msg msg) {
        return loopSend(null, msg);
    }

    /**
     * 关闭连接
     */
    public void close(){
        tcpServerSocket.close();
    }

}
